SageMakerのバッチ変換でPCAを実行してみた
データアナリティクス事業本部の鈴木です。
SageMakerのバッチ変換を使ってみたかったので、SageMaker Studioから試してみました。
バッチ変換とは
データセットを前処理や、大規模なデータセットに対する推論、永続的な推論エンドポイントが不要なときに使う機能になります。
よくJupyter Notebookなどでモデルの訓練を手動で実行し、テスト用の入力を使って推論結果を出力してみたりしますが、それをそのままSageMakerでしたいときに使うものですね。
バッチ変換に関しては以下のドキュメントに詳細の説明があります。
今回は『PCA および DBSCAN ムービークラスターを使用したバッチ変換』のサンプルを試してみました。上記ドキュメントのバッチ変換のサンプルノートブックにて紹介されているものになります。
バッチ変換など、SageMakerの推論の仕組みの使い分けは、以下のドキュメントに整理されているのでご確認ください。
前提
今回はSageMaker Studioから実行しました。実験に必要なドメイン・ユーザー・実行ロールは準備ができているものとします。
訓練およびテストデータはサンプルに記載の通り、Amazon Customer Reviews Datasetを使用しました。このデータはパブリックなS3バケット上から取得できます。推論結果などは自アカウントのバケットに保存したいため、そのためのバケットは自分で作成しました。
やってみた
1. Notebookの作成
SageMaker Studioへアクセスしました。
File
よりNotebook
を新規作成しました。
カーネルを起動するために、リソースの設定を聞かれました。以下のデフォルト設定のままSelect
を押すと起動しました。
2. ライブラリおよびデータの読み込み
以降は、基本的に記事執筆時点で『PCA および DBSCAN ムービークラスターを使用したバッチ変換』に紹介されているコードを引用しています。試した結果など補足部分をあわせて記載します。
ライブラリの読み込み
まずライブラリをインポートしました。出力先のバケットは自分で指定したかったので、『Amazon SageMakerでS3のどこに何が出力されるかをちゃんと制御したい | DevelopersIO』を参考にsagemaker.Session
で指定しました。
import sagemaker sess = sagemaker.Session(default_bucket="作成した出力先S3バケット名") bucket = sess.default_bucket() prefix = "sagemaker/DEMO-batch-transform" role = sagemaker.get_execution_role() import boto3 import sagemaker import sagemaker.amazon.common as smac from sagemaker import image_uris from sagemaker.transformer import Transformer from sagemaker.deserializers import JSONDeserializer from sagemaker.serializers import CSVSerializer import matplotlib.pyplot as plt import pandas as pd import numpy as np import scipy.sparse import os import json
なお、sagemakerライブラリのバージョンは2.145.0
でした。
sagemaker.__version__ # 以下は表示結果 # '2.145.0'
データの取得と読み込み
Amazon Customer Reviews DatasetをパブリックのS3バケットから取得しました。
!mkdir /tmp/reviews/ !aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz /tmp/reviews/
取得したデータをデータフレームとして読み込み、データの概要を確認しました。
df = pd.read_csv( "/tmp/reviews/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz", delimiter="\t", error_bad_lines=False, ) df.head() df = df[["customer_id", "product_id", "star_rating", "product_title"]]
df.head()
では以下のように表示されました。
データの傾向の確認
customer_id
とproduct_id
についてはユニークな値の個数のパーセンタイルを取得して分布について確認していました。
customers = df["customer_id"].value_counts() products = df["product_id"].value_counts() quantiles = [ 0, 0.1, 0.25, 0.5, 0.75, 0.8, 0.85, 0.9, 0.95, 0.96, 0.97, 0.98, 0.99, 0.995, 0.999, 0.9999, 1, ] print("customers\n", customers.quantile(quantiles)) print("products\n", products.quantile(quantiles)) # 以下は表示結果 # customers # 0.0000 1.0 # 0.1000 1.0 # 0.2500 1.0 # 0.5000 1.0 # 0.7500 2.0 # 0.8000 2.0 # 0.8500 3.0 # 0.9000 4.0 # 0.9500 5.0 # 0.9600 6.0 # 0.9700 7.0 # 0.9800 9.0 # 0.9900 13.0 # 0.9950 18.0 # 0.9990 37.0 # 0.9999 97.0 # 1.0000 2704.0 # Name: customer_id, dtype: float64 # products # 0.0000 1.000 # 0.1000 1.000 # 0.2500 1.000 # 0.5000 3.000 # 0.7500 9.000 # ... # 0.9990 1993.901 # 0.9999 7522.637 # 1.0000 32790.000 # Name: product_id, dtype: float64
3. 訓練・テストデータ作成
データを加工し、PCAを適用するのに適した形にしました。
customers = customers[customers >= 35] products = products[products >= 20] reduced_df = df.merge(pd.DataFrame({"customer_id": customers.index})).merge( pd.DataFrame({"product_id": products.index}) ) customers = reduced_df["customer_id"].value_counts() products = reduced_df["product_id"].value_counts() test_products = products.sample(frac=0.005) train_products = products[~(products.index.isin(test_products.index))] customer_index = pd.DataFrame( {"customer_id": customers.index, "user": np.arange(customers.shape[0])} ) train_product_index = pd.DataFrame( {"product_id": train_products.index, "item": np.arange(train_products.shape[0])} ) test_product_index = pd.DataFrame( {"product_id": test_products.index, "item": np.arange(test_products.shape[0])} ) train_df = reduced_df.merge(customer_index).merge(train_product_index) test_df = reduced_df.merge(customer_index).merge(test_product_index)
S3に加工したデータをアップロードしました。
train_sparse = scipy.sparse.csr_matrix( ( np.where(train_df["star_rating"].values >= 4, 1, 0), (train_df["item"].values, train_df["user"].values), ), shape=(train_df["item"].nunique(), customers.count()), ) test_sparse = scipy.sparse.csr_matrix( ( np.where(test_df["star_rating"].values >= 4, 1, 0), (test_df["item"].values, test_df["user"].values), ), shape=(test_df["item"].nunique(), customers.count()), ) np.savetxt("/tmp/reviews/train.csv", train_sparse.todense(), delimiter=",", fmt="%i") np.savetxt("/tmp/reviews/test.csv", test_sparse.todense(), delimiter=",", fmt="%i") train_s3 = sess.upload_data( "/tmp/reviews/train.csv", bucket=bucket, key_prefix="{}/pca/train".format(prefix) ) test_s3 = sess.upload_data( "/tmp/reviews/test.csv", bucket=bucket, key_prefix="{}/pca/test".format(prefix) )
ここまででS3バケットにデータが出力されたことを確認できました。
4. モデルのトレーニング
Use a SageMaker estimator to run a training jobも参考にしつつ、Estimator
のAPIを使ってモデルを訓練しました。
train_inputs = sagemaker.inputs.TrainingInput(train_s3, content_type="text/csv;label_size=0") container = image_uris.retrieve(framework='pca',region=boto3.Session().region_name) pca = sagemaker.estimator.Estimator( container, role, instance_count=1, instance_type="ml.m4.xlarge", output_path="s3://{}/{}/pca/output".format(bucket, prefix), sagemaker_session=sess, ) pca.set_hyperparameters( feature_dim=customers.count(), num_components=100, subtract_mean=True, algorithm_mode="randomized", mini_batch_size=500, ) pca.fit({"train": train_inputs}) # 以下は表示結果 # INFO:sagemaker:Creating training-job with name: pca-2023-05-21-05-18-47-173 # 2023-05-21 05:18:47 Starting - Starting the training job... # 2023-05-21 05:19:12 Starting - Preparing the instances for training......... # 2023-05-21 05:20:30 Downloading - Downloading input data... # 2023-05-21 05:21:00 Training - Downloading the training image...... # 2023-05-21 05:22:05 Training - Training image download completed. Training in progress..Docker entrypoint called with argument(s): train # Running default environment configuration script # (略) # ... # 2023-05-21 05:22:52 Uploading - Uploading generated training model # 2023-05-21 05:22:52 Completed - Training job completed # Training seconds: 142 # Billable seconds: 142
モデルアーティファクトがS3バケットに出力されました。
トレーニングジョブから個別の訓練の詳細を確認することができました。スクロールするともっと多くの情報を確認できました。
ここまでの操作で、訓練データに対して適用するPCAの具体的な変換を定めることができました。
5. 訓練データへのPCAの実行
訓練データに対して、先ほど作成した変換を適用します。ここで初めてバッチ変換が登場します。
pca_transformer = pca.transformer( instance_count=1, instance_type="ml.m4.xlarge", strategy="MultiRecord", assemble_with="Line", output_path="s3://{}/{}/pca/transform/train".format(bucket, prefix), ) # 以下は表示結果 # INFO:sagemaker:Creating model with name: pca-2023-05-21-05-23-31-581 pca_transformer.transform(train_s3, content_type="text/csv", split_type="Line") pca_transformer.wait() # INFO:sagemaker:Creating transform job with name: pca-2023-05-21-05-23-39-772 # ........................................Docker entrypoint called with argument(s): serve # Docker entrypoint called with argument(s): serve # Running default environment configuration script # Running default environment configuration script # [05/21/2023 05:30:24 INFO 139673570338624] loaded entry point class algorithm.serve.server_config:config_api # [05/21/2023 05:30:24 INFO 139673570338624] loaded entry point class algorithm.serve.server_config:config_api # [05/21/2023 05:30:24 INFO 139673570338624] nvidia-smi: took 0.032 seconds to run. # (略) # ... # [05/21/2023 05:30:33 INFO 139673570338624] The default executor is <PCAModel on cpu(0)>. # [05/21/2023 05:30:33 INFO 139673570338624] <PCAModel on cpu(0)> is assigned to batch slice from 0 to 1300. # #metrics {"StartTime": 1684647032.8784883, "EndTime": 1684647033.9421868, "Dimensions": {"Algorithm": "AlgorithmModel", "Host": "UNKNOWN", # "Operation": "scoring"}, "Metrics": {"invocations.count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}}} # #metrics {"StartTime": 1684647033.1706922, "EndTime": 1684647034.0191453, "Dimensions": {"Algorithm": "AlgorithmModel", "Host": "UNKNOWN", # "Operation": "scoring"}, "Metrics": {"invocations.count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}}}
以下のようにS3バケットに変換結果が出力されました。
実行したバッチ変換は、バッチ変換ジョブ
画面から確認できました。以下の画像は訓練データおよびテストデータ向けに実行した後なので2件分載っています。
6. テストデータへのPCAの実行
テストデータに対して、先ほど作成した変換を適用します。ここではsagemaker.transformer.Transformer
のAPIを使いました。
pca_model = sess.create_model_from_job( pca._current_job_name, name="{}-test".format(pca._current_job_name) ) pca_test_transformer = Transformer( pca_model, 1, "ml.m4.xlarge", output_path="s3://{}/{}/pca/transform/test".format(bucket, prefix), sagemaker_session=sess, strategy="MultiRecord", assemble_with="Line", ) pca_test_transformer.transform(test_s3, content_type="text/csv", split_type="Line") pca_test_transformer.wait() # 以下は表示結果 # INFO:sagemaker:Creating transform job with name: pca-2023-05-21-05-31-25-244 # ......................................Docker entrypoint called with argument(s): serve # Running default environment configuration script # Docker entrypoint called with argument(s): serve # Running default environment configuration script # [05/21/2023 05:37:45 INFO 140519090681664] loaded entry point class algorithm.serve.server_config:config_api # (略) # ... # [05/21/2023 05:37:48 INFO 140519090681664] The default executor is <PCAModel on cpu(0)>. # [05/21/2023 05:37:48 INFO 140519090681664] <PCAModel on cpu(0)> is assigned to batch slice from 0 to 93. # #metrics {"StartTime": 1684647468.3064656, "EndTime": 1684647468.5905228, "Dimensions": {"Algorithm": "AlgorithmModel", "Host": "UNKNOWN", "Operation": "scoring"}, # # # "Metrics": {"invocations.count": {"sum": 1.0, "count": 1, "min": 1, "max": 1}}} # 2023-05-21T05:37:48.320:[sagemaker logs]: MaxConcurrentTransforms=4, MaxPayloadInMB=6, BatchStrategy=MULTI_RECORD
以下のようにS3バケットに変換結果が出力されました。
7. 片づけ
処理を実行するインスタンスを停止しました。
これはStudioのRUNNING INSTANCES
からか、
ユーザーの詳細
から削除できました。
削除されると、ステータスがDeleted
になりました。
Experimentsの確認
Experimentsでは、今回のサンプル通りに処理を実行すると、 Unassigned runsに処理が記録されました。
パラメータなども確認できました。
最後に
今回は『PCA および DBSCAN ムービークラスターを使用したバッチ変換』に沿って、PCAを例にSageMakerのバッチ変換を試してみました。
バッチ変換を使うと、SageMaker Studioなどから変換用のインスタンスを起動し、そのインスタンス上で変換を実行できるのでとても便利ですね。
バッチ変換の実行のされ方のイメージを掴みたい方の参考になりましたら幸いです。